[[events]]
= Application Events

The following Spring application events are published by listener containers and their consumers:

* `ConsumerStartingEvent`: published when a consumer thread is first started, before it starts polling.
* `ConsumerStartedEvent`: published when a consumer is about to start polling.
* `ConsumerFailedToStartEvent`: published if no `ConsumerStartingEvent` is published within the `consumerStartTimeout` container property.
This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency.
An error message is also logged when this condition occurs.
* `ListenerContainerIdleEvent`: published when no messages have been received in `idleInterval` (if configured).
* `ListenerContainerNoLongerIdleEvent`: published when a record is consumed after previously publishing a `ListenerContainerIdleEvent`.
* `ListenerContainerPartitionIdleEvent`: published when no messages have been received from that partition in `idlePartitionEventInterval` (if configured).
* `ListenerContainerPartitionNoLongerIdleEvent`: published when a record is consumed from a partition that has previously published a `ListenerContainerPartitionIdleEvent`.
* `NonResponsiveConsumerEvent`: published when the consumer appears to be blocked in the `poll` method.
* `ConsumerPartitionPausedEvent`: published by each consumer when a partition is paused.
* `ConsumerPartitionResumedEvent`: published by each consumer when a partition is resumed.
* `ConsumerPausedEvent`: published by each consumer when the container is paused.
* `ConsumerResumedEvent`: published by each consumer when the container is resumed.
* `ConsumerStoppingEvent`: published by each consumer just before stopping.
* `ConsumerStoppedEvent`: published after the consumer is closed.
See xref:kafka/thread-safety.adoc[Thread Safety].
* `ConsumerRetryAuthEvent`: published when authentication or authorization of a consumer fails and is being retried.
* `ConsumerRetryAuthSuccessfulEvent`: published when authentication or authorization has been retried successfully. Can only occur when there has been a `ConsumerRetryAuthEvent` before.
* `ContainerStoppedEvent`: published when all consumers have stopped.

IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
If you change the multicaster to use an async executor, you must not invoke any `Consumer` methods when the event contains a reference to the consumer.

The `ListenerContainerIdleEvent` has the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `id`: The listener ID (or container bean name).
* `idleTime`: The time the container had been idle when the event was published.
* `topicPartitions`: The topics and partitions that the container was assigned at the time the event was generated.
* `consumer`: A reference to the Kafka `Consumer` object.
For example, if the consumer's `pause()` method was previously called, it can `resume()` when the event is received.
* `paused`: Whether the container is currently paused.
See xref:kafka/pause-resume.adoc[Pausing and Resuming Listener Containers] for more information.

The `ListenerContainerNoLongerIdleEvent` has the same properties, except `idleTime` and `paused`.


The `ListenerContainerPartitionIdleEvent` has the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `id`: The listener ID (or container bean name).
* `idleTime`: The time partition consumption had been idle when the event was published.
* `topicPartition`: The topic and partition that triggered the event.
* `consumer`: A reference to the Kafka `Consumer` object.
For example, if the consumer's `pause()` method was previously called, it can `resume()` when the event is received.
* `paused`: Whether that partition consumption is currently paused for that consumer.
See xref:kafka/pause-resume.adoc[Pausing and Resuming Listener Containers] for more information.

The `ListenerContainerPartitionNoLongerIdleEvent` has the same properties, except `idleTime` and `paused`.


The `NonResponsiveConsumerEvent` has the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `id`: The listener ID (or container bean name).
* `timeSinceLastPoll`: The time just before the container last called `poll()`.
* `topicPartitions`: The topics and partitions that the container was assigned at the time the event was generated.
* `consumer`: A reference to the Kafka `Consumer` object.
For example, if the consumer's `pause()` method was previously called, it can `resume()` when the event is received.
* `paused`: Whether the container is currently paused.
See xref:kafka/pause-resume.adoc[Pausing and Resuming Listener Containers] for more information.

The `ConsumerPausedEvent`, `ConsumerResumedEvent`, and `ConsumerStopping` events have the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `partitions`: The `TopicPartition` instances involved.

The `ConsumerPartitionPausedEvent`, `ConsumerPartitionResumedEvent` events have the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `partition`: The `TopicPartition` instance involved.

The `ConsumerRetryAuthEvent` event has the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `reason`:
** `AUTHENTICATION` - the event was published because of an authentication exception.
** `AUTHORIZATION` - the event was published because of an authorization exception.

The `ConsumerStartingEvent`, `ConsumerStartedEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent`, `ConsumerRetryAuthSuccessfulEvent` and `ContainerStoppedEvent` events have the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.

All containers (whether a child or a parent) publish `ContainerStoppedEvent`.
For a parent container, the source and container properties are identical.

In addition, the `ConsumerStoppedEvent` has the following additional property:

* `reason`:
** `NORMAL` - the consumer stopped normally (container was stopped).
** `ERROR` - a `java.lang.Error` was thrown.
** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`.
** `AUTH` - an `AuthenticationException` or `AuthorizationException` was thrown and the `authExceptionRetryInterval` is not configured.
** `NO_OFFSET` - there is no offset for a partition and the `auto.offset.reset` policy is `none`.

You can use this event to restart the container after such a condition:

[source, java]
----
if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}
----

[[idle-containers]]
== Detecting Idle and Non-Responsive Consumers

While efficient, one problem with asynchronous consumers is detecting when they are idle.
You might want to take some action if no messages arrive for some period of time.

You can configure the listener container to publish a `ListenerContainerIdleEvent` when some time passes with no message delivery.
While the container is idle, an event is published every `idleEventInterval` milliseconds.

To configure this feature, set the `idleEventInterval` on the container.
The following example shows how to do so:

[source, java]
----
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}
----

The following example shows how to set the `idleEventInterval` for a `@KafkaListener`:

[source, java]
----
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}
----

In each of these cases, an event is published once per minute while the container is idle.

If, for some reason, the consumer `poll()` method does not exit, no messages are received and idle events cannot be generated (this was a problem with early versions of the `kafka-clients` when the broker wasn't reachable).
In this case, the container publishes a `NonResponsiveConsumerEvent` if a poll does not return within `3x` the `pollTimeout` property.
By default, this check is performed once every 30 seconds in each container.
You can modify this behavior by setting the `monitorInterval` (default 30 seconds) and `noPollThreshold` (default 3.0) properties in the `ContainerProperties` when configuring the listener container.
The `noPollThreshold` should be greater than `1.0` to avoid getting spurious events due to a race condition.
Receiving such an event lets you stop the containers, thus waking the consumer so that it can stop.

Starting with version 2.6.2, if a container has published a `ListenerContainerIdleEvent`, it will publish a `ListenerContainerNoLongerIdleEvent` when a record is subsequently received.

[[event-consumption]]
== Event Consumption

You can capture these events by implementing `ApplicationListener` -- either a general listener or one narrowed to only receive this specific event.
You can also use `@EventListener`, introduced in Spring Framework 4.2.

The next example combines `@KafkaListener` and `@EventListener` into a single class.
You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle.
You can also use the `@EventListener`+++'+++s `condition` for this purpose.

See xref:kafka/events.adoc[Application Events] for information about event properties.

The event is normally published on the consumer thread, so it is safe to interact with the `Consumer` object.

The following example uses both `@KafkaListener` and `@EventListener`:

[source, java]
----
public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
----

IMPORTANT: Event listeners see events for all containers.
Consequently, in the preceding example, we narrow the events received based on the listener ID.
Since containers created for the `@KafkaListener` support concurrency, the actual containers are named `id-n` where the `n` is a unique value for each instance to support the concurrency.
That is why we use `startsWith` in the condition.

CAUTION: If you wish to use the idle event to stop the lister container, you should not call `container.stop()` on the thread that calls the listener.
Doing so causes delays and unnecessary log messages.
Instead, you should hand off the event to a different thread that can then stop the container.
Also, you should not `stop()` the container instance if it is a child container.
You should stop the concurrent container instead.

[[current-positions-when-idle]]
=== Current Positions when Idle

Note that you can obtain the current positions when idle is detected by implementing `ConsumerSeekAware` in your listener.
See `onIdleContainer()` in xref:kafka/seek.adoc[seek].

